// ball_broadcastobserver.h                                           -*-C++-*-
#ifndef INCLUDED_BALL_BROADCASTOBSERVER
#define INCLUDED_BALL_BROADCASTOBSERVER

#include <bsls_ident.h>
BSLS_IDENT("$Id: $")

//@PURPOSE: Provide a broadcast observer that forwards to other observers.
//
//@CLASSES:
//  ball::BroadcastObserver: observer that forwards to other observers
//
//@SEE_ALSO: ball_record, ball_context, ball_observer,
//           ball_loggermanager
//
//@DESCRIPTION: This component provides a concrete implementation of the
// `ball::Observer` protocol for receiving and processing log records:
// ```
//                ,-----------------------.
//               ( ball::BroadcastObserver )
//                `-----------------------'
//                           |             ctor
//                           |             registerObserver
//                           |             deregisterObserver
//                           |             deregisterAllObservers
//                           |             findObserver
//                           |             numRegisteredObservers
//                           |             visitObservers
//                           V
//                    ,--------------.
//                   ( ball::Observer )
//                    `--------------'
//                                         dtor
//                                         publish
//                                         releaseRecords
// ```
// `ball::BroadcastObserver` is a concrete class derived from `ball::Observer`
// that processes the log records it receives through its `publish` method by
// forwarding them to other concrete observers.  `ball::BroadcastObserver`
// maintains a registry of named observers to which it forwards log records.
// Clients of `ball::BroadcastObserver` register observers using the
// `registerObserver` method and unregister observers with the
// `deregisterObserver` method.  Once registered, an observer receives all log
// records that its associated broadcast observer receives.
//
///Thread Safety
///-------------
// `ball::BroadcastObserver` is thread-safe, meaning that multiple threads may
// share the same instance, or may have their own instances (see
// `bsldoc_glossary`).
//
///Usage
///-----
// In this section we show intended use of this component.
//
///Example 1: Delayed Observer Configuration
///- - - - - - - - - - - - - - - - - - - - -
// In this example, we will show how `ball::BroadcastObserver` can be used to
// implement delayed observer configuration.
//
// First, we define an elided custom observer that conforms to the
// `ball::Observer` protocol and supports a `configure` method:
// ```
// class ConfigurableObserver : public ball::Observer {
//     // DATA
//     bool d_configureFlag;   // configuration completion flag
//
//   public:
//     // CREATORS
//     ConfigurableObserver() : d_configureFlag(false)
//     {
//     }
//
//     // MANIPULATORS
//
//     // Configure this observer.
//     void configure() {
//         d_configureFlag = true;
//     }
//
//     using Observer::publish;  // avoid hiding base class method
//
//     /// Publish the specified `record` with the specified `context`.
//     void publish(const bsl::shared_ptr<ball::Record>& record,
//                  const ball::Context&                 context)
//     {
//         // Do not publish any records until configuration has been done.
//         if (!d_configureFlag) {
//             return;                                               // RETURN
//         }
//         // Publish the record.
//         // ...
//     }
//
//     // ACCESSORS
//     bool isConfigured() const
//     {
//         return d_configureFlag;
//     }
// };
// ```
// Then, we create a shared pointer to a `ConfigurableObserver` object and
// register it with a broadcast observer:
// ```
// bsl::shared_ptr<ConfigurableObserver>
//                                  myObserverPtr(new ConfigurableObserver());
//
// ball::BroadcastObserver broadcastObserver;
//
// int rc = broadcastObserver.registerObserver(myObserver, "observer");
//
// assert(0 == rc);
// ```
// Finally, we can retrieve the registered observer and configure it:
// ```
// bsl::shared_ptr<ConfigurableObserver> tmpObserverPtr;
//
// rc = broadcastObserver.findObserver(&tmpObserverPtr, "observer");
//
// assert(0 == rc);
// assert(myObserverPtr == tmpObserverPtr);
// assert(false == tmpObserverPtr->isConfigured());
//
// tmpObserverPtr->configure();
//
// assert(true == tmpObserverPtr->isConfigured());
// ```
// Note that there is an alternative way to obtain a shared pointer to the
// registered observer:
// ```
// bsl::shared_ptr<Observer> oPtr =
//                                 broadcastObserver.findObserver("observer");
//
// assert(oPtr.ptr());
//
// bsl::shared_ptr<ConfigurableObserver> anotherObserverPtr;
//
// bslstl::SharedPtrUtil::dynamicCast(&anotherObserverPtr, oPtr);
//
// assert(myObserverPtr == anotherObserverPtr);
// ```

#include <balscm_version.h>

#include <ball_observer.h>

#include <bdlb_transparentequalto.h>
#include <bdlb_transparenthash.h>

#include <bslma_allocator.h>
#include <bslma_default.h>

#include <bslmf_enableif.h>
#include <bslmf_isconvertible.h>

#include <bslmt_readerwritermutex.h>
#include <bslmt_readlockguard.h>

#include <bsls_keyword.h>

#include <bsl_memory.h>
#include <bsl_string.h>
#include <bsl_unordered_map.h>


namespace BloombergLP {
namespace ball {

class Record;
class Context;

                         // =======================
                         // class BroadcastObserver
                         // =======================

/// This class provides a broadcasting implementation of the `Observer`
/// protocol.  Other concrete observers may be registered and named with
/// this broadcast observer (`registerObserver` method), retrieved
/// (`findObserver` method), and unregistered (`deregisterObserver` method).
/// The `publish` method of this class forwards the log records that it
/// receives to the `publish` method of each registered observer.
class BroadcastObserver : public Observer {

  public:
    // TYPES

    /// This `typedef` is an alias for the type of the registry maintained
    /// by this observer.
    typedef bsl::unordered_map<bsl::string,
                               bsl::shared_ptr<Observer>,
                               bdlb::TransparentHash,
                               bdlb::TransparentEqualTo> ObserverRegistry;

  private:
    // DATA
    ObserverRegistry                 d_observers;  // observer registry

    mutable bslmt::ReaderWriterMutex d_rwMutex;    // protects concurrent
                                                   // access to `d_observers`

    // NOT IMPLEMENTED
    BroadcastObserver(const BroadcastObserver&);
    BroadcastObserver& operator=(const BroadcastObserver&);

  public:
    // CREATORS

    /// Create a broadcast observer having no registered observers.
    /// Optionally specify a `basicAllocator` used to supply memory.  If
    /// `basicAllocator` is 0, the currently installed default allocator is
    /// used.
    explicit BroadcastObserver(bslma::Allocator *basicAllocator = 0);

    /// Destroy this broadcast observer.  Note that this method has no
    /// effect on the lifetime of observers registered with this observer,
    /// if any.
    ~BroadcastObserver() BSLS_KEYWORD_OVERRIDE;

    // MANIPULATORS

    /// Remove all observers from the registry of this broadcast observer.
    void deregisterAllObservers();

    /// Remove the observer having the specified `observerName` from the
    /// registry of this broadcast observer.  Return 0 if the observer
    /// having `observerName` was successfully deregistered, and a non-zero
    /// value (with no effect) otherwise.  Henceforth, the observer that had
    /// `observerName` will no longer receive log records from this
    /// observer.
    int deregisterObserver(const bsl::string_view& observerName);

    /// Return a shared pointer to the observer having the specified
    /// `observerName` in the registry of this broadcast observer, and an
    /// empty shared pointer if there is no such observer otherwise.
    bsl::shared_ptr<Observer> findObserver(
                                         const bsl::string_view& observerName);

    /// Load into the specified `result` a shared pointer to the observer of
    /// (template parameter) `t_OBSERVER` type having the specified
    /// `observerName` in the registry of this broadcast observer, and an
    /// empty shared pointer if there is no such observer otherwise.  Return
    /// 0 if a non-empty shared pointer was loaded, and a non-zero value
    /// otherwise.  Note that an empty shared pointer will be loaded if
    /// either no observer having `observerName` is in the registry or the
    /// observer registered with that name is not of `t_OBSERVER` type.
    ///
    /// The implementation is placed here in the class definition to work
    /// around a Microsoft C++ compiler (version 16) bug where the
    /// definition cannot be matched to the declaration when an `enable_if`
    /// is used.
    template <class t_OBSERVER>
    int findObserver(
                bsl::shared_ptr<t_OBSERVER> *result,
                const bsl::string_view&      observerName,
                typename bsl::enable_if<
                    bsl::is_convertible<t_OBSERVER *, ball::Observer *>::value,
                    void *>::type = 0)
    {
        bslstl::SharedPtrUtil::dynamicCast(result, findObserver(observerName));
        return *result ? 0 : 1;
    }

    using Observer::publish;  // Avoid hiding base class method.

    /// Process the specified log `record` having the specified publishing
    /// `context` by forwarding `record` and `context` to each of the
    /// observers registered with this broadcast observer.
    void publish(const bsl::shared_ptr<const Record>& record,
                 const Context&                       context)
                                                         BSLS_KEYWORD_OVERRIDE;

    /// Add the specified `observer` with the specified `observerName` to
    /// the registry of this broadcast observer.  Return 0 if `observer` was
    /// successfully registered, and a non-zero value (with no effect)
    /// otherwise.  Henceforth, this observer will forward each record it
    /// receives through its `publish` method, including the record's
    /// context, to the `publish` method of `observer`, until `observer` is
    /// deregistered.  The behavior is undefined if a cyclic reference is
    /// created among registered observers.  Note that this method will fail
    /// if an observer having `observerName` is already registered.
    int registerObserver(const bsl::shared_ptr<Observer>& observer,
                         const bsl::string_view&          observerName);

    /// Discard any shared reference to a `Record` object that was supplied
    /// to the `publish` method, and is held by this observer.  This
    /// implementation processes `releaseRecords` by calling
    /// `releaseRecords` on each of the registered observers.  Note that
    /// this operation should be called if resources underlying the
    /// previously provided shared pointers must be released.
    void releaseRecords() BSLS_KEYWORD_OVERRIDE;

    /// Invoke the specified `visitor` functor of (template parameter)
    /// `t_VISITOR` type on each element in the registry of this broadcast
    /// observer, supplying that functor modifiable access to each observer.
    /// `visitor` must be a functor that can be called as if it had the
    /// following signature:
    /// ```
    /// void operator()(const bsl::shared_ptr<Observer>& observer,
    ///                 const bsl::string_view&          observerName);
    /// ```
    template <class t_VISITOR>
    void visitObservers(t_VISITOR& visitor);

    // ACCESSORS

    /// Return a shared pointer to the observer having the specified
    /// `observerName` in the registry of this broadcast observer, and an
    /// empty shared pointer if there is no such observer otherwise.
    bsl::shared_ptr<const Observer> findObserver(
                                   const bsl::string_view& observerName) const;

    /// Load into the specified `result` a shared pointer to the observer of
    /// (template parameter) `t_OBSERVER` type having the specified
    /// `observerName` in the registry of this broadcast observer, and an
    /// empty shared pointer if there is no such observer otherwise.  Return
    /// 0 if a non-empty shared pointer was loaded, and a non-zero value
    /// otherwise.  Note that an empty shared pointer will be loaded if
    /// either no observer having `observerName` is in the registry or the
    /// observer registered with that name is not of `t_OBSERVER` type.
    ///
    /// The implementation is placed here in the class definition to work
    /// around a Microsoft C++ compiler (version 16) bug where the
    /// definition cannot be matched to the declaration when an `enable_if`
    /// is used.
    template <class t_OBSERVER>
    int findObserver(bsl::shared_ptr<const t_OBSERVER> *result,
                     const bsl::string_view&            observerName,
                     typename bsl::enable_if<
                         bsl::is_convertible<const t_OBSERVER *,
                                             const ball::Observer *>::value,
                         void *>::type = 0) const
    {
        bslstl::SharedPtrUtil::dynamicCast(result, findObserver(observerName));
        return *result ? 0 : 1;
    }

    /// Return the number of observers registered with this broadcast
    /// observer.
    int numRegisteredObservers() const;

    /// Invoke the specified `visitor` functor of (template parameter)
    /// `t_VISITOR` type on each element in the registry of this broadcast
    /// observer, supplying that functor modifiable access to each observer.
    /// `visitor` must be a functor that can be called as if it had the
    /// following signature:
    /// ```
    /// void operator()(const bsl::shared_ptr<Observer>& observer,
    ///                 const bsl::string_view&          observerName);
    /// ```
    template <class t_VISITOR>
    void visitObservers(const t_VISITOR& visitor) const;
};

// ============================================================================
//                              INLINE DEFINITIONS
// ============================================================================

                         // -----------------------
                         // class BroadcastObserver
                         // -----------------------

// CREATORS
inline
BroadcastObserver::BroadcastObserver(bslma::Allocator *basicAllocator)
: d_observers(bslma::Default::allocator(basicAllocator))
{
}

// MANIPULATORS
template <class t_VISITOR>
inline
void BroadcastObserver::visitObservers(t_VISITOR& visitor)
{
    bslmt::ReadLockGuard<bslmt::ReaderWriterMutex> guard(&d_rwMutex);

    for (ObserverRegistry::const_iterator it = d_observers.cbegin();
         it != d_observers.cend();
         ++it) {
        visitor(it->second, it->first);
    }
}

// ACCESSORS
inline
int BroadcastObserver::numRegisteredObservers() const
{
    bslmt::ReadLockGuard<bslmt::ReaderWriterMutex> guard(&d_rwMutex);

    return static_cast<int>(d_observers.size());
}

template <class t_VISITOR>
inline
void BroadcastObserver::visitObservers(const t_VISITOR& visitor) const
{
    bslmt::ReadLockGuard<bslmt::ReaderWriterMutex> guard(&d_rwMutex);

    for (ObserverRegistry::const_iterator it = d_observers.cbegin();
         it != d_observers.cend();
         ++it) {
        visitor(it->second, it->first);
    }
}

}  // close package namespace
}  // close enterprise namespace

#endif

// ----------------------------------------------------------------------------
// Copyright 2017 Bloomberg Finance L.P.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------- END-OF-FILE ----------------------------------
